# chat/models.py
from django.db import models
# Create your models here.
class Quote(models.Model):
datetime = models.DateTimeField()
symbol = models.CharField(max_length=30)
name = models.CharField(max_length=30)
referencePrice = models.FloatField()
openPrice = models.FloatField()
highPrice = models.FloatField()
lowPrice = models.FloatField()
closePrice = models.FloatField()
avgPrice = models.FloatField()
lastSize = models.IntegerField()
class Meta:
db_table = 'quote'
def __str__(self):
return f'{self.datetime},{self.symbol},{self.name}'
python3 manage.py makemigrations
預期看到
python3 manage.py migrate
預期看到
#chat/fugle.py
import time
from fugle_marketdata import WebSocketClient, RestClient
import asyncio
from asgiref.sync import async_to_sync, sync_to_async
import channels.layers
import django, sys
from os.path import join, dirname, abspath
from os import environ
from datetime import datetime
# ---------------------- django setting --------------------------
PROJECT_DIR = dirname(dirname(abspath(__file__)))
sys.path.insert(0, PROJECT_DIR)
# Set the correct path to you settings module
environ.setdefault("DJANGO_SETTINGS_MODULE", "django_channels.settings")
# All django stuff has to come after the setup:
django.setup()
# ---------------------- django setting --------------------------
channel_layer = channels.layers.get_channel_layer()
YOUR_API_KEY = 'XXX'
from chat.models import Quote
def main():
from fugle_marketdata import RestClient
client = RestClient(api_key=YOUR_API_KEY)
stock = client.stock
_dt = None
while True:
time.sleep(5)
data = stock.intraday.quote(symbol='2330')
dt = datetime.fromtimestamp(data['closeTime']/ 10**6)
if dt != _dt:
_dt = dt
Quote.objects.create(
datetime=dt, symbol=data['symbol'], name=data['name'],
referencePrice=data['referencePrice'], openPrice=data['openPrice'], highPrice=data['highPrice'],
lowPrice=data['lowPrice'], closePrice=data['closePrice'], avgPrice=data['avgPrice'],
lastSize=data['lastSize']
).save()
if __name__ == '__main__':
main()
python3 chat/fugle.py
預期上述要能達到的功能是定時(5秒) 呼叫一次查詢股價的功能,並且存至 Quote 這個模型裡
# chat/consumers.py
import json
from datetime import datetime
from asgiref.sync import async_to_sync
from channels.generic.websocket import WebsocketConsumer
from chat.models import Product, Quote
from fugle_marketdata import RestClient
api_key = 'XXX'
#
class ChatConsumer(WebsocketConsumer):
def connect(self):
self.room_name = self.scope["url_route"]["kwargs"]["room_name"]
self.room_group_name = f"chat_{self.room_name}"
client = RestClient(api_key=api_key)
self.stock = client.stock
# Join room group
async_to_sync(self.channel_layer.group_add)(
self.room_group_name, self.channel_name
)
self.accept()
message = f"""
歡迎來到 {self.room_name} 股票即時推播聊天室, 請輸入"查詢目前價格" 關鍵字
"""
async_to_sync(self.channel_layer.group_send)(
self.room_group_name, {"type": "chat.message", "message": message}
)
# 新加入
obj = Quote.objects.filter(symbol=self.room_name).order_by('-datetime').first()
message = '--------------------------------\n'
message += f"{obj.datetime}, {self.room_name} {obj.name}, 現在價格: {obj.closePrice} 量: {obj.lastSize}\n"
message += '--------------------------------\n'
async_to_sync(self.channel_layer.group_send)(
self.room_group_name, {"type": "chat.message", "message": message}
)
def disconnect(self, close_code):
# Leave room group
async_to_sync(self.channel_layer.group_discard)(
self.room_group_name, self.channel_name
)
# Receive message from WebSocket
def receive(self, text_data):
text_data_json = json.loads(text_data)
if text_data_json['message'] == '查詢目前價格':
obj = Quote.objects.filter(symbol=self.room_name).order_by('-datetime').first()
message = f'查詢字串: {text_data_json["message"]}\n'
message += f"{obj.datetime}, {self.room_name} {obj.name}, 現在價格: {obj.closePrice} 量: {obj.lastSize}\n"
self.send(text_data=json.dumps({"message": message}))
python3 manage.py runserver 8000
在 Consumer.py 裡時做登入時回應現在股價,以及查詢現在價格功能
不想要查詢,只想登入後看到推播,只能從 Websocket 來著手
要看到效果限制在盤中時段
使用下圖功能
#chat/fugle_websocket.py
import time
from fugle_marketdata import WebSocketClient
import asyncio
from asgiref.sync import async_to_sync, sync_to_async
import channels.layers
import django, sys
from os.path import join, dirname, abspath
from os import environ
import json
# ---------------------- django setting --------------------------
PROJECT_DIR = dirname(dirname(abspath(__file__)))
sys.path.insert(0, PROJECT_DIR)
# Set the correct path to you settings module
environ.setdefault("DJANGO_SETTINGS_MODULE", "django_channels.settings")
# All django stuff has to come after the setup:
django.setup()
# ---------------------- django setting --------------------------
channel_layer = channels.layers.get_channel_layer()
YOUR_API_KEY = 'xxx'
def handle_message(message):
print(message)
message = json.loads(message)
if message['event'] in ['data']:
data = message['data']
symbol = data['symbol']
async_to_sync(channel_layer.group_send)(f"chat_{symbol}",
{'type': 'trades.message', "message": message})
async def main():
client = WebSocketClient(api_key=YOUR_API_KEY)
stock = client.stock
stock.on('message', handle_message)
await stock.connect()
stock.subscribe({
'channel': 'trades',
'symbol': '2330'
})
if __name__ == '__main__':
asyncio.run(main())
python3 chat/fugle_websocket.py
預期看到
# chat/consumers.py
import json
from datetime import datetime
from asgiref.sync import async_to_sync
from channels.generic.websocket import WebsocketConsumer
from chat.models import Product, Quote
from fugle_marketdata import RestClient
api_key = 'xxx'
#
class ChatConsumer(WebsocketConsumer):
def connect(self):
self.room_name = self.scope["url_route"]["kwargs"]["room_name"]
self.room_group_name = f"chat_{self.room_name}"
client = RestClient(api_key=api_key)
self.stock = client.stock
# Join room group
async_to_sync(self.channel_layer.group_add)(
self.room_group_name, self.channel_name
)
self.accept()
message = f"""
歡迎來到 {self.room_name} 股票即時推播聊天室, 請輸入"查詢目前價格" 關鍵字
"""
async_to_sync(self.channel_layer.group_send)(
self.room_group_name, {"type": "chat.message", "message": message}
)
# 新加入
obj = Quote.objects.filter(symbol=self.room_name).order_by('-datetime').first()
message = '--------------------------------\n'
message += f"{obj.datetime}, {self.room_name} {obj.name}, 現在價格: {obj.closePrice} 量: {obj.lastSize}\n"
message += '--------------------------------\n'
async_to_sync(self.channel_layer.group_send)(
self.room_group_name, {"type": "chat.message", "message": message}
)
def disconnect(self, close_code):
# Leave room group
async_to_sync(self.channel_layer.group_discard)(
self.room_group_name, self.channel_name
)
# Receive message from WebSocket
def receive(self, text_data):
text_data_json = json.loads(text_data)
if text_data_json['message'] == '查詢目前價格':
obj = Quote.objects.filter(symbol=self.room_name).order_by('-datetime').first()
message = f'查詢字串: {text_data_json["message"]}\n'
message += f"{obj.datetime}, {self.room_name} {obj.name}, 現在價格: {obj.closePrice} 量: {obj.lastSize}\n"
self.send(text_data=json.dumps({"message": message}))
def trades_message(self, event):
data = event["message"]
# Send message to WebSocket
dt = datetime.fromtimestamp(data['data']['time'] / 10**6)
msg = f"{dt}, {self.room_name} 現在價格: {data['data']['price']} 總量: {data['data']['volume']}\n"
self.send(text_data=json.dumps({"message": msg}))
注意看可發現這裡加上 trades_message 的函式,用來接受即時訊息
python3 manage.py runserver 8000
在 Consumer.py 裡時做登入時回應現在股價,以及查詢現在價格功能 + 接受即時推播資訊並且顯示
完成